初学netty 简单理解
netty的 future 比 java 的future 多了监听器的功能,使的future.get()这个阻塞的形式不是必须。 可以用非阻塞的形式。 但是netty 的future 有两个问题 1、一般要和一个执行器executor关联,也就是要有个线程去异步执行task。 2、 一般只能等操作完成或者出现异常错误才会返回,不能在任务过程中去设置success或者fail,也不能设置返回值。。 第一个问题,要和一个异步task队列相关联,所以netty给我实现了 就用把 第二个问题so Promise解决了第二个问题。 那么也就是说 Promise 可以在任务过程中 有条件的设置success 或者fail 或者cancel。
还有Promise是future的子接口 所以一般可以和futrue相等,而且设置值是线程安全的。
比较常用的Promise 是DefaultPromise。这里简单的做一个udp的需求 有一个操作是udp发一个操作报文,那么5秒中之内得到回应的话 就输出操作成功。 如果不成功每5秒钟 重试一次 重试3次后 如果还失败就输出发送重试没成功。 UdpServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 public class UdpServer { /** * 启动服务 */ public void bind(int port) { log.info("-------------------------------udpServer-------------------------"); //表示服务器连接监听线程组,专门接受 accept 新的客户端client 连接 EventLoopGroup bossLoopGroup = new NioEventLoopGroup(); HashMap<InetSocketAddress, Channel> map = new LinkedHashMap<>(); try { //1,创建netty bootstrap 启动类 Bootstrap serverBootstrap = new Bootstrap(); //2、设置boostrap 的eventLoopGroup线程组 serverBootstrap = serverBootstrap.group(bossLoopGroup); //3、设置NIO UDP连接通道 serverBootstrap = serverBootstrap.channel(NioDatagramChannel.class); //4、设置通道参数 SO_BROADCAST广播形式 serverBootstrap = serverBootstrap.option(ChannelOption.SO_BROADCAST, true); //5、设置处理类 装配流水线 serverBootstrap = serverBootstrap.handler(new BootNettyUdpSimpleChannelInboundHandler(map)); //6、绑定server,通过调用sync()方法异步阻塞,直到绑定成功 ChannelFuture f = serverBootstrap.bind(port).sync(); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { channelFuture.isSuccess(); } }); log.info(UdpServer.class.getName() + " started and listend on " + f.channel().localAddress()); /* f.channel().eventLoop().execute(new Runnable() { @Override public void run() { f.channel().writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8), datagramPacket.sender())); } });*/ // ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() << 1); //起了个netty执行器 相当于起了个线程 DefaultEventExecutor executor=new DefaultEventExecutor(); executor.execute(new Runnable() { @Override public void run() { ChannelFuture cf; log.info("位置1 "+String.valueOf(Thread.currentThread().getId())); while (true) { try { sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } if (map.keySet().size() > 0) { Channel c=map.entrySet().iterator().next().getValue(); InetSocketAddress add=map.entrySet().iterator().next().getKey(); cf= c.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8), add)); cf.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { if(future.isSuccess()){ //懒得用其他结构了 直接在channel里的attr 里设置个状态1 发出去还没回应,2 发出去回应了。 AttributeKey<Integer> bechoAttr = AttributeKey.valueOf("becho"); Attribute<Integer> bechoA=c.attr(bechoAttr); bechoA.set(1); String attrKey="becho"; int attrValue=2; //自己写了个DefaultPromise结果 EchoWrite ew=new EchoWrite(); DefaultPromise tof =ew.echoWrite("echorequest",add,c, attrKey,attrValue,3,5000); tof.addListener(new GenericFutureListener<Future<? super Boolean>>() { @Override public void operationComplete(Future<? super Boolean> future) { if(future.isSuccess()){ log.info("成功回应"); } else {log.info("好几次没回应"); } } }); } } }); break; } // else continue; } } }); //7、监听通道关闭事件,应用程序会一直等待,直到channel关闭 f.channel().closeFuture().sync(); }catch(Exception e){ // TODO: handle exception }finally{ System.out.println("netty udp close!"); //8 关闭EventLoopGroup, bossLoopGroup.shutdownGracefully(); } } }
BootNettyUdpSimpleChannelInboundHandler类服务端消息处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class BootNettyUdpSimpleChannelInboundHandler extends SimpleChannelInboundHandler<DatagramPacket> { HashMap<InetSocketAddress, Channel> map; public BootNettyUdpSimpleChannelInboundHandler(HashMap<InetSocketAddress, Channel> map) { this.map = map; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception { try { log.info("位置2 "+String.valueOf(Thread.currentThread().getId())); String strdata = datagramPacket.content().toString(CharsetUtil.UTF_8); if(strdata.equals("echo")){ //得到的消息如果是echo 那么设置状态2 AttributeKey<Integer> bechoAttr = AttributeKey.valueOf("becho"); Attribute<Integer> bechoA=channelHandlerContext.channel().attr(bechoAttr); bechoA.set(2); } //打印收到的消息 log.info("---------------------receive data--------------------------"); InetSocketAddress remoteAddress = datagramPacket.sender(); String ip = remoteAddress.getAddress().getHostAddress(); int port = remoteAddress.getPort(); map.put(remoteAddress,channelHandlerContext.channel()); log.info(ip+" "+port); log.info(strdata); log.info("---------------------receive data--------------------------"); //收到udp消息后,可通过此方式原路返回的方式返回消息,例如返回时间戳 // channelHandlerContext.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8), datagramPacket.sender())); } catch (Exception e) { } }
做了一个通用的EchoWrite 辅助类 这个类 实现超时重传。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class EchoWrite { EventExecutor e; DefaultPromise t; public EchoWrite() { this.e = new DefaultEventExecutor(); t=new DefaultPromise(this.e); } public <T> DefaultPromise echoWrite(String msg, InetSocketAddress add ,Channel c,String attrKey ,T attrValue, int times, int timeout) { e.submit(new Runnable() { @Override public void run() { int times1 = 0; log.info("位置3 "+String.valueOf(Thread.currentThread().getId())); c.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8), add)); log.info("发出echo"); AttributeKey<Integer> bechoAttr = AttributeKey.valueOf(attrKey); Attribute<Integer> bechoA = c.attr(bechoAttr); while (times1 < times) { try { sleep(timeout); } catch (InterruptedException e) { throw new RuntimeException(e); } bechoA = c.attr(bechoAttr); if (bechoA.get() == attrValue) { t.setSuccess(true); break; } else { c.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8), add)); times1++; log.info("发出echo"); continue; } } if (times1 == times) { Exception ex = new Exception("超时没有回应"); t.setFailure(ex); } } }); return t; } }
客户端就简单了 只需给服务端发一个报文,让服务端得到channel ,还有就会收到echorequest 发回echo就行了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class UdpClient { public static void main(String[] args) throws Exception{ int port = 51000; if(args != null && args.length > 0){ port = Integer.valueOf(args[0]); } new UdpClient().run(port); } public void bindclient(){ int port = 51000; try { run(port); } catch (Exception e) { throw new RuntimeException(e); } } public void run(int port) throws Exception{ EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(eventLoopGroup) .channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new UdpClientHandler()); Channel channel = bootstrap.bind(0).sync().channel(); AttributeKey<Integer> timesAttrKey = AttributeKey.valueOf("times"); Attribute<Integer> timeA=channel.attr(timesAttrKey); timeA.set(0); //向网段内所有机器广播 channel.writeAndFlush( new DatagramPacket( Unpooled.copiedBuffer("发个测试信息", CharsetUtil.UTF_8), // new InetSocketAddress("255.255.255.255", port) new InetSocketAddress("127.0.0.1", port) ) ).sync(); //客户端等待15s用于接收服务端的应答消息,然后退出并释放资源 // while(true); //这里注意 如果注释掉 那么客户端只会给服务器发个消息 不会回应echorequest 如果不注释 那么会发回应echo。 }catch (Exception e){ e.printStackTrace(); }finally { eventLoopGroup.shutdownGracefully(); } } }
客户端消息处理handler
1 2 3 4 5 6 7 8 9 10 public class UdpClientHandler extends SimpleChannelInboundHandler<DatagramPacket> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket packet) throws Exception { String resp = packet.content().toString(CharsetUtil.UTF_8); System.out.println("客户端接收结果:" + resp); //channelHandlerContext.close(); channelHandlerContext.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("echo", CharsetUtil.UTF_8), packet.sender())); } }
可能用定时器实现更方便 ,这里就是简单的sleep了 回头定时器任务试试。